package org.niit.service

import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.streaming.dstream.DStream
import org.niit.common.TService
import org.niit.bean.AdClickData
import org.niit.util.SparkUtil
//对电影评分数据进行分析，获得电影平均分Top10,要求电影次数大于200
class MovieDataService extends TService {

  private val spark = SparkUtil.takeSpark();

  override def dataAnalysis(data: DStream[AdClickData]): Any = {
    import spark.implicits._//导入SparkSQL的隐式转换
    //1.读取 rating_100k.data  SparkSQL
    val ds:Dataset[String] = spark.read.textFile("input/rating_100k.data")

    //2.处理数据 去除多余数据 userId 时间戳
    val movieDF: DataFrame = ds.map(line => {
      val arr = line.split("\t")
      //将电影id作为key 电影评分作为值
      (arr(1), arr(2).toInt)
    }).toDF("movieId", "score")
     /*
         movieId  score
           196      4
           196      2
           196      3
      */
    //3.创建临时表 临时视图
    movieDF.createOrReplaceTempView("t_movies")
    //4.利用SQL进行统计
    //对电影评分数据进行分析，获得电影平均分Top10,要求电影次数大于200
    val sql =
      """
        |select movieId,avg(score) as avgScore,count(*) as count
        |from t_movies
        |group by movieId
        |having count > 200
        |order by avgScore desc
        |limit 10
        |""".stripMargin
    /*
       movieId  avgScore    count
         194     4.46        213
     */
    spark.sql(sql).show()

    //DSL方式来对数据进行分析
    //对电影评分数据进行分析，获得电影平均分Top10,要求电影次数大于200
    import org.apache.spark.sql.functions._

    movieDF.groupBy('movieId)
      .agg(
        avg('score) as ("avgScore"),
        count("movieId") as("count")
      ).filter("count > 200")
      .orderBy('avgScore.desc)
      .limit(10)
      .show()
  }
}
